---
title: DASE Components Explained (E-Commerce Recommendation)
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<%= partial 'shared/dase/dase', locals: { template_name: 'E-Commerce Recommendation Engine Template' } %>

## The Engine Design

As you can see from the Quick Start, *MyECommerceRecommendation* takes a JSON prediction
query, e.g. `{ "user": "u1", "num": 4 }`, and return a JSON predicted result.
In MyECommerceRecommendation/src/main/scala/***Engine.scala***, the `Query` case class
defines the format of such **query**:

```scala
case class Query(
  user: String,
  num: Int,
  categories: Option[Set[String]],
  whiteList: Option[Set[String]],
  blackList: Option[Set[String]]
) extends Serializable
```

The `PredictedResult` case class defines the format of **predicted result**,
such as

```json
{"itemScores":[
  {"item":22,"score":4.07},
  {"item":62,"score":4.05},
  {"item":75,"score":4.04},
  {"item":68,"score":3.81}
]}
```

with:

```scala
case class PredictedResult(
  itemScores: Array[ItemScore]
) extends Serializable

case class ItemScore(
  item: String,
  score: Double
) extends Serializable
```

Finally, `ECommerceRecommendationEngine` is the *Engine Factory* that defines the
components this engine will use: Data Source, Data Preparator, Algorithm(s) and
Serving components.

```scala
object ECommerceRecommendationEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("ecomm" -> classOf[ECommAlgorithm]),
      classOf[Serving])
  }
}
```

### Spark MLlib

The PredictionIO E-Commerce Recommendation Engine Template integrates Spark's MLlib ALS algorithm under the DASE
architecture. We will take a closer look at the DASE code below.

The MLlib ALS algorithm takes training data of RDD type, i.e. `RDD[Rating]` and train a model, which is a `MatrixFactorizationModel` object.

You can visit [here](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) to learn more about MLlib's ALS collaborative filtering algorithm.


## Data

In the DASE architecture, data is prepared by 2 components sequentially: *DataSource* and *DataPreparator*. They take data
from the data store and prepare them for Algorithm.

### Data Source

In MyECommerceRecommendation/src/main/scala/***DataSource.scala***, the `readTraining`
method of class `DataSource` reads and selects data from the *Event Store*
(data store of the *Event Server*). It returns `TrainingData`.

```scala
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    // create a RDD of (entityID, User)
    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(...) ...

    // create a RDD of (entityID, Item)
    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(...) ...

    // get all "user" "view" or "buy" "item" events from event store
    val eventsRDD: RDD[Event] = PEventStore.find(...) ...

    // filter all view events
    val viewEventsRDD: RDD[ViewEvent] = eventsRDD.filter { ... } ...

    // filter all buy events
    val buyEventsRDD: RDD[BuyEvent] = eventsRDD.filter { ...} ...

    new TrainingData(
      users = usersRDD,
      items = itemsRDD,
      viewEvents = viewEventsRDD,
      buyEvents = buyEventsRDD
    )
  }
}
```

PredictionIO automatically loads the parameters of *datasource* specified in MyECommerceRecommendation/***engine.json***, including *appName*, to `dsp`.

In ***engine.json***:

```
{
  ...
  "datasource": {
    "params" : {
      "appName": "MyApp1"
    }
  },
  ...
}
```

In `readTraining()`, `PEventStore` is an object which provides function to access data that is collected by PredictionIO Event Server.

This E-Commerce Recommendation Engine Template requires "user" and "item" entities that are set by events.

`PEventStore.aggregateProperties(...)` aggregates properties of the `user` and `item` that are set, unset, or delete by special events **$set**, **$unset** and **$delete**. Please refer to [Event API](/datacollection/eventapi/#note-about-properties) for more details of using these events.

The following code aggregates the properties of `user` and then map each result to a `User()` object.

```scala

  // create a RDD of (entityID, User)
  val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
    appName = dsp.appName,
    entityType = "user"
  )(sc).map { case (entityId, properties) =>
    val user = try {
      User()
    } catch {
      case e: Exception => {
        logger.error(s"Failed to get properties ${properties} of" +
          s" user ${entityId}. Exception: ${e}.")
        throw e
      }
    }
    (entityId, user)
  }.cache()

```
In the template, `User()` object is a simple dummy as a placeholder for you to customize and expand.


Similarly, the following code aggregates `item` properties and then map each result to an `Item()` object. By default, this template assumes each item has an optional property `categories`, which is a list of String.

```scala
  // create a RDD of (entityID, Item)
  val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
    appName = dsp.appName,
    entityType = "item"
  )(sc).map { case (entityId, properties) =>
    val item = try {
      // Assume categories is optional property of item.
      Item(categories = properties.getOpt[List[String]]("categories"))
    } catch {
      case e: Exception => {
        logger.error(s"Failed to get properties ${properties} of" +
          s" item ${entityId}. Exception: ${e}.")
        throw e
      }
    }
    (entityId, item)
  }.cache()
```

The `Item` case class is defined as

```scala
case class Item(categories: Option[List[String]])
```

`PEventStore.find(...)` specifies the events that you want to read. In this case, "user view item" and "user buy item" events are read

```scala

  // get all "user" "view" "item" events
  val eventsRDD: RDD[Event] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("view", "buy")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      .cache()

```

Note that `.cache()` is used to cache the RDD data into memory since eventsRDD will be used multiple times later.

Then we filter the events we are intersted in and map the event to a `ViewEvent` object.

```scala

  val viewEventsRDD: RDD[ViewEvent] = eventsRDD
      .filter { event => event.event == "view" }
      .map { event =>
        try {
          ViewEvent(
            user = event.entityId,
            item = event.targetEntityId.get,
            t = event.eventTime.getMillis
          )
        } catch {
          case e: Exception =>
            logger.error(s"Cannot convert ${event} to ViewEvent." +
              s" Exception: ${e}.")
            throw e
        }
      }
```

`ViewEvent` case class is defined as:

```scala
case class ViewEvent(user: String, item: String, t: Long)
```

We filter buy event in similar way and map to `BuyEvent` object for later use.


```scala

  val buyEventsRDD: RDD[BuyEvent] = eventsRDD
      .filter { event => event.event == "buy" }
      .map { event =>
        try {
          BuyEvent(
            user = event.entityId,
            item = event.targetEntityId.get,
            t = event.eventTime.getMillis
          )
        } catch {
          case e: Exception =>
            logger.error(s"Cannot convert ${event} to BuyEvent." +
              s" Exception: ${e}.")
            throw e
        }
      }

```

`BuyEvent` case class is defined as:

```scala
case class BuyEvent(user: String, item: String, t: Long)
```

INFO: For flexibility, this template is designed to support user ID and item ID in String.

`TrainingData` contains an RDD of `User`, `Item` and `ViewEvent` objects. The class definition of `TrainingData` is:

```scala
class TrainingData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val buyEvents: RDD[BuyEvent]
) extends Serializable { ... }
```

PredictionIO then passes the returned `TrainingData` object to *Data Preparator*.

NOTE: You could modify the DataSource to read other event other than the default **view** or **buy**.

### Data Preparator

In MyECommerceRecommendation/src/main/scala/***Preparator.scala***, the `prepare` method
of class `Preparator` takes `TrainingData` as its input and performs any
necessary feature selection and data processing tasks. At the end, it returns
`PreparedData` which should contain the data *Algorithm* needs.

By default, `prepare` simply copies the unprocessed `TrainingData` data to `PreparedData`:

```scala
class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(
      users = trainingData.users,
      items = trainingData.items,
      viewEvents = trainingData.viewEvents,
      buyEvents = trainingData.buyEvents)
  }
}

class PreparedData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val buyEvents: RDD[BuyEvent]
) extends Serializable
```

PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function.

## Algorithm

In MyECommerceRecommendation/src/main/scala/***ECommAlgorithm.scala***, the two methods of
the algorithm class are `train` and `predict`. `train` is responsible for
training the predictive model;`predict` is
responsible for using this model to make prediction.


### Algorithm parameters

The ECommAlgorithm takes the following parameters, as defined by the `ECommAlgorithmParams` case class:

```scala
case class ECommAlgorithmParams(
  appName: String,
  unseenOnly: Boolean,
  seenEvents: List[String],
  similarEvents: List[String],
  rank: Int,
  numIterations: Int,
  lambda: Double,
  seed: Option[Long]
) extends Params
```

Parameter description:

- **appName**: Your App name. Events defined by "seenEvents" and "similarEvents" will be read from this app during `predict`.
- **unseenOnly**: true or false. Set to true if you want to recommend unseen items only. Seen items are defined by *seenEvents* which mean if the user has these events on the items, then it's treated as *seen*.
- **seenEvents**: A list of user-to-item events which will be treated as *seen* events. Used when *unseenOnly* is set to true.
- **similarEvents**: A list of user-item-item events which will be used to find similar items to the items which the user has performed these events on.
- **rank**: Parameter of the MLlib ALS algorithm. Number of latent features.
- **numIterations**: Parameter of the MLlib ALS algorithm. Number of iterations.
- **lambda**: Regularization parameter of the MLlib ALS algorithm.
- **seed**: Optional. A random seed of the MLlib ALS algorithm. Specify a fixed value if want to have deterministic result.

### train(...)

`train` is called when you run **pio train**. This is where MLlib ALS algorithm,
i.e. `ALS.trainImplicit()`, is used to train a predictive model. In addition, we also count the number of items being bought for each item as default model which will be used when there is no ALS model available or other useful information about the user is available during `predict`.

```scala

  def train(sc: SparkContext, data: PreparedData): ECommModel = {
    ...

    // create User and item's String ID to integer index BiMap
    val userStringIntMap = BiMap.stringInt(data.users.keys)
    val itemStringIntMap = BiMap.stringInt(data.items.keys)

    // generate MLlibRating data for ALS algorithm
    val mllibRatings: RDD[MLlibRating] = genMLlibRating(
      userStringIntMap = userStringIntMap,
      itemStringIntMap = itemStringIntMap,
      data = data
    )

    // seed for MLlib ALS
    val seed = ap.seed.getOrElse(System.nanoTime)

    val m = ALS.trainImplicit(
      ratings = mllibRatings,
      rank = ap.rank,
      iterations = ap.numIterations,
      lambda = ap.lambda,
      blocks = -1,
      alpha = 1.0,
      seed = seed)

    ...

    // count the number of items being bought for recommendation popular items as default case
    val popularCount = trainDefault(
      userStringIntMap = userStringIntMap,
      itemStringIntMap = itemStringIntMap,
      data = data
    )
    ...

  }

```

#### Working with Spark MLlib's ALS.trainImplicit(....)

MLlib ALS does not support `String` user ID and item ID. `ALS.trainImplicit` thus also assumes int-only `Rating` object. First, you can rename MLlib's Integer-only `Rating` to `MLlibRating` for clarity:

```
import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
```

In order to use MLlib's ALS algorithm, we need to convert the `viewEvents` into `MLlibRating`. There are two things we need to handle:

1. Map user and item String ID of the ViewEvent into Integer ID, as required by `MLlibRating`.
2. `ViewEvent` object is an implicit event that does not have an explicit rating value. `ALS.trainImplicit()` supports implicit preference. If the `MLlibRating` has higher rating value, it means higher confidence that the user prefers the item. Hence we can aggregate how many times the user has viewed the item to indicate the confidence level that the user may prefer the item.

You create a bi-directional map with `BiMap.stringInt` which maps each String record to an Integer index.

```scala
val userStringIntMap = BiMap.stringInt(data.users.keys)
val itemStringIntMap = BiMap.stringInt(data.items.keys)
```

Then convert the user and item String ID in each ViewEvent to Int with these BiMaps. We use default -1 if the user or item String ID couldn't be found in the BiMap and filter out these events with invalid user and item ID later. After filtering, we use `reduceByKey()` to add up all values for the same key (uindex, iindex) and then finally map to `MLlibRating` object. You can find the code inside the function `genMLlibRating()`:

```scala

  def genMLlibRating(
    userStringIntMap: BiMap[String, Int],
    itemStringIntMap: BiMap[String, Int],
    data: PreparedData): RDD[MLlibRating] = {

    val mllibRatings = data.viewEvents
      .map { r =>
        // Convert user and item String IDs to Int index for MLlib
        val uindex = userStringIntMap.getOrElse(r.user, -1)
        val iindex = itemStringIntMap.getOrElse(r.item, -1)

        if (uindex == -1)
          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
            + " to Int index.")

        if (iindex == -1)
          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
            + " to Int index.")

        ((uindex, iindex), 1)
      }
      .filter { case ((u, i), v) =>
        // keep events with valid user and item index
        (u != -1) && (i != -1)
      }
      .reduceByKey(_ + _) // aggregate all view events of same user-item pair
      .map { case ((u, i), v) =>
        // MLlibRating requires integer index for user and item
        MLlibRating(u, i, v)
      }
      .cache()

    mllibRatings
  }

```

NOTE: You can customize this function if you want to convert other events to MLlibRating or need different ways to aggregate the events into MLlibRating.

In addition to `RDD[MLlibRating]`, `ALS.trainImplicit` takes the following parameters: *rank*, *iterations*, *lambda* and *seed*.

The values of these parameters are specified in *algorithms* of
MyECommerceRecommendation/***engine.json***:

```
{
  ...
  "algorithms": [
    {
      "name": "als",
      "params": {
        "appName": "MyApp1",
        "unseenOnly": true,
        "seenEvents": ["buy", "view"],
        "similarEvents" : ["view"]
        "rank": 10,
        "numIterations" : 20,
        "lambda": 0.01,
        "seed": 3
      }
    }
  ]
  ...
}
```

The parameters `appName`, `unseenOnly`, `seenEvents` and `similarEvents` are used during `predict()`, which will be explained later.

PredictionIO will automatically loads these values into the constructor `ap`,
which has a corresponding case class `ECommAlgorithmParams`.

The `seed` parameter is an optional parameter, which is used by MLlib ALS algorithm internally to generate random values. If the `seed` is not specified, current system time would be used and hence each train may produce different results. Specify a fixed value for the `seed` if you want to have deterministic result (For example, when you are testing).

`ALS.trainImplicit()` returns a `MatrixFactorizationModel` model which contains two RDDs: userFeatures and productFeatures. They correspond to the user X latent features matrix and item X latent features matrix, respectively.

In addition to the latent feature vector, the item properties (e.g. categories) and popular count are also used during `predict()`. Hence, we also save these data along with the feature vector by joining them and then collect the data as local Map. Each item is represented by a `ProductModel` class, which consists of the `item` information, `features` calculated by ALS, and `count` returned by `trainDefault()`.

```scala

case class ProductModel(
  item: Item,
  features: Option[Array[Double]], // features by ALS
  count: Int // popular count for default score
)

```

```scala
    // join item with the trained productFeatures
    val productFeatures: Map[Int, (Item, Option[Array[Double]])] =
      items.leftOuterJoin(m.productFeatures).collectAsMap.toMap

    ...

    val productModels: Map[Int, ProductModel] = productFeatures
      .map { case (index, (item, features)) =>
        val pm = ProductModel(
          item = item,
          features = features,
          // NOTE: use getOrElse because popularCount may not contain all items.
          count = popularCount.getOrElse(index, 0)
        )
        (index, pm)
      }

    new ECommModel(
      rank = m.rank,
      userFeatures = userFeatures,
      productModels = productModels,
      userStringIntMap = userStringIntMap,
      itemStringIntMap = itemStringIntMap
    )

```

Note that `leftOuterJoin` is used because the productFeatures returned by ALS may not contain all items.

The `ECommModel` is defined as the following:

```scala
class ECommModel(
  val rank: Int,
  val userFeatures: Map[Int, Array[Double]],
  val productModels: Map[Int, ProductModel],
  val userStringIntMap: BiMap[String, Int],
  val itemStringIntMap: BiMap[String, Int]
) extends Serializable  { ... }
```

PredictionIO will automatically store the returned model after training, i.e. `ECommModel` in this example.

### predict(...)

`predict` is called when you send a JSON query to
http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "user": "u1", "num": 4 }` to the `Query` class you defined previously.

We can use the userFeatures and productFeatures stored in ECommModel to calculate the scores of items for the user.

This template also supports additional business logic features, such as filtering items by categories, recommending items in the white list, excluding items in the black list, recommend unseen items only, and exclude unavailable items defined in constraint event.

The `predict()` function does the following:

1. Convert the item in query's whiteList from string ID to integer index
2. Get a list seen items by the user (defined by parameter `seenEvents`)
3. Get the latest unavailableItems which is used to exclude unavailable items for all users
4. Combine query's blackList, seenItems, and unavailableItems into a final black list of items to be excluded from recommendation.
5. Get the user feature vector from the ECommModel.
6. If there is feature vector for the user, recommend top N items based on the user feature and product features.
7. If there is no feature vector for the user, use the recent items acted by the user (defined by `similarEvents` parameter) to recommend similar items.
8. If there is no recent `similarEvents` available for the user, popular items are then recommended (added in template version 0.4.0).

Only items which satisfy the `isCandidate()` condition will be recommended. By default, the item can be recommended if:

- it belongs to one of the categories defined in query.
- it is one of the white list items if white list is defined.
- it is not in the black list.

INFO: You can easily modify `isCandidate()` checking or related logic if you have different requirements or condition to determine if an item is a candidate item to be recommended.

```scala

  def predict(model: ECommModel, query: Query): PredictedResult = {

    val userFeatures = model.userFeatures
    val productFeatures = model.productFeatures

    // convert whiteList's string ID to integer index
    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
      set.map(model.itemStringIntMap.get(_)).flatten
    )

    // generate final blackList based on additional constraints
    val finalBlackList: Set[Int] = genBlackList(query = query)
      // convert seen Items list from String ID to integer Index
      .flatMap(x => model.itemStringIntMap.get(x))

    // look up user feature from model
    val userFeature =
      model.userStringIntMap.get(query.user).map { userIndex =>
        userFeatures.get(userIndex)
      }
      // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
      .flatten

    val topScores: Array[(Int, Double)] = if (userFeature.isDefined) {
      // the user has feature vector
      predictKnownUser(
        userFeature = userFeature.get,
        productModels = productModels,
        query = query,
        whiteList = whiteList,
        blackList = finalBlackList
      )
    } else {
      // the user doesn't have feature vector.
      // For example, new user is created after model is trained.
      logger.info(s"No userFeature found for user ${query.user}.")

      // check if the user has recent events on some items
      val recentItems: Set[String] = getRecentItems(query)
      val recentList: Set[Int] = recentItems.flatMap (x =>
        model.itemStringIntMap.get(x))

      val recentFeatures: Vector[Array[Double]] = recentList.toVector
        // productModels may not contain the requested item
        .map { i =>
          productModels.get(i).flatMap { pm => pm.features }
        }.flatten

      if (recentFeatures.isEmpty) {
        logger.info(s"No features vector for recent items ${recentItems}.")
        predictDefault(
          productModels = productModels,
          query = query,
          whiteList = whiteList,
          blackList = finalBlackList
        )
      } else {
        predictSimilar(
          recentFeatures = recentFeatures,
          productModels = productModels,
          query = query,
          whiteList = whiteList,
          blackList = finalBlackList
        )
      }
    }

    ...
  }
```

Note that the item IDs in top N results are the `Int` indices. You map them back to `String` with `itemIntStringMap` before they are returned.

```scala
  val itemScores = topScores.map { case (i, s) =>
    new ItemScore(
      // convert item int index back to string ID
      item = model.itemIntStringMap(i),
      score = s
    )
  }

  new PredictedResult(itemScores)
```

PredictionIO passes the returned `PredictedResult` object to *Serving*.

## Serving

The `serve` method of class `Serving` processes predicted result. It is also
responsible for combining multiple predicted results into one if you have more
than one predictive model. *Serving* then returns the final predicted result.
PredictionIO will convert it to a JSON response automatically.

In MyECommerceRecommendation/src/main/scala/***Serving.scala***,

```scala
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}
```

When you send a JSON query to http://localhost:8000/queries.json,
`PredictedResult` from all models will be passed to `serve` as a sequence, i.e.
`Seq[PredictedResult]`.

> An engine can train multiple models if you specify more than one Algorithm
component in `object RecommendationEngine` inside ***Engine.scala***. Since only
one `ECommAlgorithm` is implemented by default, this `Seq` contains one element.
